package org.databandtech.sparkstreaming;

import java.util.HashMap;
import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;

/*
 * 发布
./bin/spark-submit --master spark://192.168.13.164:5030 --class org.databandtech.sparkstreaming.App /usr/jar/rt.jar

./bin/spark-submit --class org.databandtech.sparkstreaming.KafkaApp /usr/jar/rt.jar
 --jars spark-sql-kafka-0-10_2.12-3.1.2.jar
./bin/spark-submit --class org.databandtech.sparkstreaming.KafkaApp /usr/jar/rt.jar

 */
public class KafkaApp {
	
	static int FILTERINGTHRESHOLD = 1;

	public static void main(String[] args) {

		System.out.println("kafka source start...");

		SparkSession spark = SparkSession.builder().appName("kafkaconsumer").master("local[*]").getOrCreate();

		// json data schema
		StructType dataSchema = new StructType().add("action_type", "string").add("sys_id", "string")
				.add("user_id", "string").add("user_group_id", "string").add("epg_group_id", "string")
				.add("stb_ip", "string").add("stb_id", "string").add("stb_type", "string").add("stb_mac", "string")
				.add("terminal_type", "string").add("log_time", "string").add("mediacode", "string")
				.add("definition", "string").add("bitrate", "string").add("start_time", "string")
				.add("currentplaytime", "string").add("refer_type", "string").add("refer_page_id", "string")
				.add("area_code", "string");

		Dataset<Row> df = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "hadoop001:9092")
				.option("subscribe", "Hello-Kafka")
				// .option("subscribePattern", "topic.*")
				.option("startingOffsets", "earliest")
				// .option("endingOffsets", "latest")
				// .schema(dataSchema)
				.load();
		// 方法1：直接json解析
		String nestTimestampFormat = "yyyy-MM-dd HH:mm:ss";
		HashMap<String, String> jsonOptions = new HashMap<String, String>();
		jsonOptions.put("timestampFormat", nestTimestampFormat);
		Dataset<Row> parsed = df.select(
				functions.from_json(df.col("value").cast("string"), dataSchema, jsonOptions).alias("parsed_value"),
				df.col("timestamp"));
		// 查询
		// 使用ingestion time（入栈时间） 大致的延迟的估算计算 来替代 event time+ watermark的精确计算
		Dataset<Row> selected = parsed.select("parsed_value.*", "timestamp");
		// 使用event time（事件时间）
		// .withColumn("dtime",functions.to_timestamp(functions.from_unixtime(parsed.col("parsed_value.log_time").substr(0,10),"yyyy-MM-dd
		// HH:mm:ss"),"yyyy-MM-dd HH:mm:ss"));

		//输出打印Selected dataframe
		StreamingQuery querySelected;
		try {
			querySelected = selected.writeStream().outputMode("update").format("console").start();
		} catch (TimeoutException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		/**
		 * 方法2：使用gson方式的json解析
		 * 
		 * // Configure Gson GsonBuilder gsonBuilder = new GsonBuilder(); Gson gson =
		 * gsonBuilder.create(); //map操作进行转换 Dataset<EpgVod> parsed = df
		 * .selectExpr("CAST(value AS STRING)") .map((MapFunction<Row, EpgVod>) row -> {
		 * EpgVod model = gson.fromJson(row.getString(0), EpgVod.class); return model;
		 * }, Encoders.bean(EpgVod.class));
		 * 
		 **/

		// 运营商+播放分类合计groupBy方式
		Dataset<Row> onlineCountsBySysid = selected.groupBy(functions.window(selected.col("timestamp"), "1 minutes"),
				selected.col("sys_id"), selected.col("action_type")).count();

		StreamingQuery query;
		try {
			query = onlineCountsBySysid.writeStream().outputMode("update").format("console").start();

		} catch (TimeoutException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

//		// 运营商+播放分类合计cube方式
//		Dataset<Row> onlineCountsBySysidCube = selected.cube(functions.window(selected.col("timestamp"), "5 minutes"),
//				selected.col("sys_id"), selected.col("action_type")).count();
//
//		StreamingQuery queryOnlineCountsBySysidCube;
//		try {
//			queryOnlineCountsBySysidCube = onlineCountsBySysidCube.writeStream().outputMode("update").format("console").start();
//		} catch (TimeoutException e) {
//			e.printStackTrace();
//		}

		// 运营商+地域topN 合计，groupBy方式,输出模式：complete
		Dataset<Row> onlineCountsByArea = selected.groupBy(functions.window(selected.col("timestamp"), "1 minutes"),
				selected.col("sys_id"), selected.col("area_code")).count();
		//清除数值过于小的聚合后的row，即累计时不予处理数据过小的row，其中FILTERINGTHRESHOLD=1
		onlineCountsByArea = onlineCountsByArea.filter(onlineCountsByArea.col("count").$greater(FILTERINGTHRESHOLD));

		StreamingQuery queryOnlineCountsByArea;
		try {
			queryOnlineCountsByArea = onlineCountsByArea.writeStream().outputMode("update").format("console").start();
		} catch (TimeoutException e) {
			e.printStackTrace();
		}

		// await for termination
		try {
			spark.streams().awaitAnyTermination();
		} catch (StreamingQueryException e) {
			e.printStackTrace();
		}

	}

}
